package com.netty.constant;


import com.alibaba.fastjson.JSONObject;
import com.netty.model.ReplyBody;
import com.netty.model.SentBody;
import com.netty.model.proto.SentBodyProto;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
 * 用于存放用户channel信息仅用于点对点发送
 */
public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>> {

    private static final Collection<Channel> EMPTY_LIST = new LinkedList<>();

    private final transient ChannelFutureListener remover = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            future.removeListener(this);
            remove(future.channel());
        }
    };

    protected String getKey(Channel channel) {
        return channel.attr(Constants.SessionConfig.UID).get();
    }

    public void remove(Channel channel) {

        String uid = getKey(channel);

        if (uid == null) {
            return;
        }

        Collection<Channel> collections = getOrDefault(uid, EMPTY_LIST);

        collections.remove(channel);

        if (collections.isEmpty()) {
            remove(uid);
        }
    }

    public void add(Channel channel) {

        String uid = getKey(channel);

        if (uid == null || !channel.isActive()) {
            return;
        }

        channel.closeFuture().addListener(remover);

        Collection<Channel> collections = this.putIfAbsent(uid, new ConcurrentLinkedQueue<>(Collections.singleton(channel)));
        if (collections != null) {
            collections.add(channel);
        }

        if (!channel.isActive()) {
            remove(channel);
        }
    }

    public void writep2p(String key, ReplyBody reply) {
        find(key).forEach(channel -> {
            String channelKey = channel.attr(Constants.SessionConfig.CHANNEL).get();
            switch (channelKey) {
                case Constants.ImserverConfig.SOCKET: //socket 消息
                    SentBodyProto.Model.Builder sentBody = SentBodyProto.Model.newBuilder();
                    sentBody.setKey(reply.getKey());
                    sentBody.setTimestamp(System.currentTimeMillis());
                    sentBody.putData("body", reply.get("body"));
                    channel.writeAndFlush(sentBody);
                    break;
                case Constants.ImserverConfig.WEBSOCKET:
                    channel.writeAndFlush(reply);
                    break;
                case Constants.ImserverConfig.UDPSOCKET: //websocket 消息
                    SentBody body = new SentBody();
                    body.setKey(reply.getKey());
                    body.setTimestamp(System.currentTimeMillis());
                    body.put("body",reply.get("body"));
                    channel.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(JSONObject.toJSONString(body), CharsetUtil.UTF_8),
                            new InetSocketAddress(channel.attr(Constants.SessionConfig.IPS).get(),channel.attr(Constants.SessionConfig.PORT).get())));
                    break;
            }
        });
    }

    public void writep2p(String key, ReplyBody reply, Predicate<Channel> matcher) {
        find(key).stream().filter(matcher).forEach(channel -> {
            String channelKey = channel.attr(Constants.SessionConfig.CHANNEL).get();
            switch (channelKey) {
                case Constants.ImserverConfig.SOCKET: //socket 消息
                    SentBodyProto.Model.Builder sentBody = SentBodyProto.Model.newBuilder();
                    sentBody.setKey(reply.getKey());
                    sentBody.setTimestamp(System.currentTimeMillis());
                    sentBody.putData("body", reply.get("body"));
                    channel.writeAndFlush(sentBody);
                    break;
                case Constants.ImserverConfig.WEBSOCKET:
                    channel.writeAndFlush(reply);
                    break;
                case Constants.ImserverConfig.UDPSOCKET: //websocket 消息
                    SentBody body = new SentBody();
                    body.setKey(reply.getKey());
                    body.setTimestamp(System.currentTimeMillis());
                    body.put("body",reply.get("body"));
                    channel.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(JSONObject.toJSONString(body), CharsetUtil.UTF_8),
                            new InetSocketAddress(channel.attr(Constants.SessionConfig.IPS).get(),channel.attr(Constants.SessionConfig.PORT).get())));
                    break;
            }
        });
    }

    public Collection<Channel> find(String key) {
        return this.getOrDefault(key, EMPTY_LIST);
    }

    public Collection<Channel> find(String key, String... channel) {
        List<String> channels = Arrays.asList(channel);
        return find(key).stream().filter(item -> channels.contains(item.attr(Constants.SessionConfig.CHANNEL).get())).collect(Collectors.toList());
    }

    /**
     * 获取某个组id中左右channel信息
     * @param groupId
     * @return
     */
    public void writep2Group(String groupId,ReplyBody reply) {
        this.forEach((s, channels) -> {
            channels.stream().filter(item -> Objects.equals(item.attr(Constants.SessionConfig.GROUP_ID).get(),groupId)).forEach(channel->{
                String channelKey = channel.attr(Constants.SessionConfig.CHANNEL).get();
                switch (channelKey) {
                    case Constants.ImserverConfig.SOCKET: //socket 消息
                        SentBodyProto.Model.Builder sentBody = SentBodyProto.Model.newBuilder();
                        sentBody.setKey(reply.getKey());
                        sentBody.setTimestamp(System.currentTimeMillis());
                        sentBody.putData("body", reply.get("body"));
                        channel.writeAndFlush(sentBody);
                        break;
                    case Constants.ImserverConfig.WEBSOCKET:
                        channel.writeAndFlush(reply);
                        break;
                    case Constants.ImserverConfig.UDPSOCKET: //websocket 消息
                        SentBody body = new SentBody();
                        body.setKey(reply.getKey());
                        body.setTimestamp(System.currentTimeMillis());
                        body.put("body",reply.get("body"));
                        channel.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(JSONObject.toJSONString(body), CharsetUtil.UTF_8),
                                new InetSocketAddress(channel.attr(Constants.SessionConfig.IPS).get(),channel.attr(Constants.SessionConfig.PORT).get())));
                        break;
                }
            });
        });
    }

    public void writep2All(ReplyBody reply){
        this.forEach((s, channels) -> {
            channels.forEach(channel -> {
                //点对点发送
                String channelKey = channel.attr(Constants.SessionConfig.CHANNEL).get();
                switch (channelKey) {
                    case Constants.ImserverConfig.SOCKET: //socket 消息
                        SentBodyProto.Model.Builder sentBody = SentBodyProto.Model.newBuilder();
                        sentBody.setKey(reply.getKey());
                        sentBody.setTimestamp(System.currentTimeMillis());
                        sentBody.putData("body", reply.get("body"));
                        channel.writeAndFlush(sentBody);
                        break;
                    case Constants.ImserverConfig.WEBSOCKET:
                        channel.writeAndFlush(reply);
                        break;
                    case Constants.ImserverConfig.UDPSOCKET: //websocket 消息
                        SentBody body = new SentBody();
                        body.setKey(reply.getKey());
                        body.setTimestamp(System.currentTimeMillis());
                        body.put("body",reply.get("body"));
                        channel.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(JSONObject.toJSONString(body), CharsetUtil.UTF_8),
                                new InetSocketAddress(channel.attr(Constants.SessionConfig.IPS).get(),channel.attr(Constants.SessionConfig.PORT).get())));
                        break;
                }
            });
        });

    }

    public List<Map<String,Object>> findAll(){
        List<Map<String,Object>> maps = new ArrayList<>();
        this.forEach((s, channels) -> {
            channels.forEach(channel -> {
                Map<String,Object> map = new HashMap<>();
                map.put("channl",channel.attr(Constants.SessionConfig.ID).get());
                map.put("user",channel.attr(Constants.SessionConfig.UID).get());
                maps.add(map);
            });
        });
        return maps;
    }

}